package me.seawenc.db.migration.service;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import me.seawenc.db.migration.bean.DatabaseBean;
import me.seawenc.db.migration.bean.ExecParameters;
import me.seawenc.db.migration.bean.TableBean;
import me.seawenc.db.migration.bean.table.TableFieldBean;
import me.seawenc.db.migration.dbengine.DbType;
import me.seawenc.db.migration.helper.ExecTimeTake;
import me.seawenc.db.migration.helper.FileHelper;
import me.seawenc.db.migration.helper.LocalCmdHelper;
import me.seawenc.db.migration.helper.Optionalx;
import me.seawenc.db.migration.helper.YmlHelper;
import me.seawenc.db.migration.helper.LogDataxHelper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import static me.seawenc.db.migration.helper.LocalCmdHelper.LOG_TYPE_ERROR;

public class DataxService {
    Logger logger = LogManager.getLogger(DataxService.class);
    private final ExecParameters parameters;
    private final DatabaseBean srcDatabase;
    private final DatabaseBean targetDatabase;

    // 执行过程中的日志 ,key=表名，value=日志
    private Map<String,List<LocalCmdHelper.ConsoleLogBean>> execLogs=new HashMap<>();

    private String tmpPath;
    private String pythonPath;
    private String dataxPath;
    private String dataxConfig;

    private boolean skipError;
    ExecTimeTake timeTake;

    LogDataxHelper loggerHelper;
    String logLevel;
    public DataxService(ExecParameters parameters, DatabaseBean srcDatabase, DatabaseBean targetDatabase) throws Exception {
        this.parameters=parameters;
        this.srcDatabase=srcDatabase;
        this.targetDatabase=targetDatabase;
        tmpPath = YmlHelper.getConfig(parameters.getConfigPath(),"migration.tempFilePath", "tmp/db-migrations/");
        pythonPath = YmlHelper.getConfig(parameters.getConfigPath(),"migration.datax.pythonPath", "python");
        dataxPath = YmlHelper.getConfig(parameters.getConfigPath(),"migration.datax.dataxPath", "bin/datax.py");
        skipError = Boolean.parseBoolean(YmlHelper.getConfig(parameters.getConfigPath(),"migration.datax.skip-error", "false"));
        logLevel = YmlHelper.getConfig(parameters.getConfigPath(),"migration.datax.log-level", "ALL").toUpperCase();
        dataxConfig = createDataxConfig();
        // 清空tmpPath下所有文件
        File file = new File(tmpPath);
        file.mkdirs();
        for (File f : file.listFiles()) {
            Optionalx.ifThen(!f.delete(), () -> logger.warn("删除文件失败:{}", f.getAbsolutePath()));
        }
        logger.info("删除临时文件完成");
    }

    public void dataxMigration() throws Exception {
        //打印需要迁移的汇总信息
        printMigrationBeginInfo();
        timeTake =new ExecTimeTake();
        try {
            for (int i = 0; i < srcDatabase.getDbTables().size(); i++){

                final TableBean tableBean = srcDatabase.getDbTables().get(i);
                // 1. 初化执行环境，包含：1.创建datax配置文件，2.初始化打印日志
                String dataxJsonPath =initDataxAndRetJsonPath(tableBean);
                // 2. 执行datax命令
                LocalCmdHelper.executeCommand(String.format("%s %s %s",pythonPath,dataxPath,dataxJsonPath),(log)-> logCollect(log,tableBean));
                // 3. 判断执行结果,如果出错，则退出
                assertExecResult(tableBean.getTableName());
                // 4. 关闭相关资源
                endWork(i,tableBean);
            }
        } catch (Exception e) {
            logger.error("执行数据迁移出错", e);
        }
        // 打印迁移汇总信息
        printMigrationSummary();
    }

    private void endWork(int i,TableBean tableBean) throws InterruptedException {
        List<LocalCmdHelper.ConsoleLogBean> logs = execLogs.get(tableBean.getTableName());
        String status=Optionalx.isPresent(logs)?(workHasSuccess(logs)?"\u001B[1;32m成功\u001B[0m":"\u001B[1;31m失败\u001B[0m"):"跳过";
        logger.info(String.format("\u001B[1;32m 数据迁移当前进度:%s/%s,当前表 %s \u001B[0m 迁移已 %s",i+1, srcDatabase.getDbTables().size(),tableBean.getTableName(),status));
        // sleep一下，不然后续看不到
        Thread.sleep(1000);
        loggerHelper.closeLogFile();
    }

    private void printMigrationSummary() {
        logger.info("迁移汇总信息");

        long successNum = execLogs.entrySet().stream().filter(l -> workHasSuccess(l.getValue())).count();
        String status=execLogs.size()==successNum?"\u001B[32m成功\u001B[0m":"\u001B[31m失败\u001B[0m";

        logger.info(String.format("数据数据迁移%s,总耗时:%s,完成数:\u001B[32m %s \u001B[0m,失败数:\u001B[31m %s \u001B[0m,总数:%s,以下为明细信息:",status,timeTake.getFormat(),successNum,execLogs.size()-successNum,srcDatabase.getDbTables().size()));
        // 先打印设置表头(中文表头无法对齐)
        logger.info(String.format("\033[1;30;42m|%3s| %31s | %2s | %7s | %8s | %6s | %13s | %13s | %10s | %6s |\033[0m","#","表名","状态","启动时刻","结束时刻","耗时","平均流量","写入速度","记录总数","失败总数"));
        for (int i = 0; i < srcDatabase.getDbTables().size(); i++) {
            TableBean table=srcDatabase.getDbTables().get(i);
            findTableExecSummary(i,table);
        }
        logger.warn(String.format("若有失败的表，请到日志中查看具体失败原因，日志目录为：%s{tablename}.json.log",tmpPath));
    }

    private void findTableExecSummary(int num, TableBean table) {
        List<LocalCmdHelper.ConsoleLogBean> logs = execLogs.get(table.getTableName());
        if(logs==null){
            logs=new ArrayList<>();
        }
        String status=logs.size()>0?(workHasSuccess(logs)?"\u001B[32m成功\u001B[0m":"\u001B[31m失败\u001B[0m"):"跳过";

        String beginTime = findDataxSummaryData(logs, "任务启动时刻");
        String endTime = findDataxSummaryData(logs, "任务结束时刻");
        String take = findDataxSummaryData(logs, "任务总计耗时");
        String liuliang = findDataxSummaryData(logs, "任务平均流量");
        String speed = findDataxSummaryData(logs, "记录写入速度");
        String totalNum = findDataxSummaryData(logs, "读出记录总数");
        String errorNum = findDataxSummaryData(logs, "读写失败总数");
        // "表名","状态","启动时刻","结束时刻","耗时","平均流量","写入速度","记录总数","失败总数"
        logger.info(String.format("|%3s| %32s | %4s | %10s | %10s | %8s | %16s | %16s | %12s | %8s |",num,table.getTableName(),status,beginTime,endTime,take,liuliang,speed,totalNum,errorNum));
    }

    /**
     * 从以下日志中找到最后的关键数字
     *
     * 任务启动时刻                    : 2024-01-09 10:31:35
     * 任务结束时刻                    : 2024-01-09 10:32:06
     * 任务总计耗时                    :                 31s
     * 任务平均流量                    :               44B/s
     * 记录写入速度                    :              0rec/s
     * 读出记录总数                    :                  15
     * 读写失败总数                    :                   0
     * @param key
     * @return
     */
    private String findDataxSummaryData(List<LocalCmdHelper.ConsoleLogBean> logs,String key){
        Optional<LocalCmdHelper.ConsoleLogBean> line = logs.stream().filter(l -> l.ctn.contains(key)).findFirst();
        if(!line.isPresent()){
            return "-";
        }
        String ctn = line.get().ctn;
        // 以下代码为兼容只取时间，不要日期,例： 2024-01-09 10:31:35 只取 -> 10:31:35
        ctn = ctn.substring(ctn.indexOf(":") + 1).trim();
        String[] split = ctn.split(" ");
        return split[split.length-1];
    }

    private void printMigrationBeginInfo() throws Exception {
        List<String> names = srcDatabase.getDbTables().stream().map(t->t.getTableName()).collect(Collectors.toList());
        String namesStr = String.format("\u001B[1;32m %s \u001B[0m",names.stream().collect(Collectors.joining(",")));
        logger.info(String.format("3秒后进行数据迁移，本次迁移表\u001B[1;31m总数为：%s \u001B[0m,明细：%s",names.size(),namesStr));
        // 等待3秒，不然看不到上面的关键信息
        Thread.sleep(1000*3);
    }

    private void assertExecResult(String tableName) {
        List<LocalCmdHelper.ConsoleLogBean> logs = execLogs.get(tableName);
        if(logs==null||logs.size()==0){
            logger.error(tableName+":数据迁移出错!");
            if(!skipError){
                throw new RuntimeException("数据迁移出错,退出");
            }
            return ;
        }
        boolean hasError = workHasSuccess(logs);

        if(!hasError){
            logger.error(tableName+":数据迁移出错，最后20行日志：");
            for (int i = Math.max(0,logs.size()-20); i < logs.size(); i++){
                logger.error(logs.get(i).ctn);
            }
            if(!skipError){
                throw new RuntimeException("数据迁移出错,退出");
            }
        }
    }

    private static boolean workHasSuccess(List<LocalCmdHelper.ConsoleLogBean> logs) {
        if(logs==null||logs.size()==0){
            return false;
        }
        List<LocalCmdHelper.ConsoleLogBean> lastLogs = logs.subList(Math.min(20, logs.size()-1), logs.size()-1);
        // 若最后20行日志中包含错误信息，则认为有错误
        return lastLogs.stream().anyMatch(l-> l.ctn.contains("读写失败总数                    :"));
    }

    private String initDataxAndRetJsonPath(TableBean tableBean) throws IOException {
        Optional<String> idFieldOptional = tableBean.getFields().stream().filter(f -> "Y".equals(f.getPriKey())).map(f -> f.getName()).findFirst();
        String idField = idFieldOptional.isPresent()?idFieldOptional.get():"";

        String condition = YmlHelper.getConfig(parameters.getConfigPath(), String.format("migration.datax.condition.%s", tableBean.getTableName()), "");
        // 将文件写入临时文件
        final String dataxJsonPath = String.format("%s%s.json", tmpPath, tableBean.getTableName());

        logger.info(String.format("本次将要执行的迁移sql为：\u001B[1;32m select * from %s where 1=1\u001B[0m\u001B[1;31m %s \u001B[0m",tableBean.getTableName(),condition));

        // 2. 替换配置文件中的占位符
        String currentConfig = dataxConfig.replace("_tableName_", tableBean.getTableName())
                                          .replace("_condition_", condition)
                                          .replace("_splitPk_",Optionalx.ofByDefGet(idField,""));

        // 3.将datax配置写入外部文件
        FileHelper.writeToExtFile(dataxJsonPath, currentConfig);
        // 初始化执行日志文件
        loggerHelper=new LogDataxHelper(dataxJsonPath+".log");

        return dataxJsonPath;
    }

    private String formatColumnName(DbType dbType, TableFieldBean f) {
        //mysql需要加反引号
        if(DbType.MYSQL.equals(dbType)){
            return String.format("`%s`",f.getName());
        }
        return String.format("\"%s\"",f.getName());
    }

    private void logCollect(LocalCmdHelper.ConsoleLogBean log, TableBean tableBean) {
        String tableName = tableBean.getTableName();
        execLogs.computeIfAbsent(tableName,(t)-> new ArrayList<>()).add(log);
        if(Optionalx.isNotPresent(log.ctn)){
            return;
        }
        /**
         * 文件 日志
         */
        //日志 文件 全部都写
        Optionalx.ifThen(log.isInfoLog(),()-> loggerHelper.info(log.ctn));
        Optionalx.ifThen(log.isErrorLog(),()-> loggerHelper.error(log.ctn));

        /**
         * 控制台 日志
         */
        if("ALL".contains(logLevel)){
            // 如果配置文件中为打印所有日志，
            Optionalx.ifThen(log.isInfoLog(),()-> logger.info(log.ctn));
            Optionalx.ifThen(log.isErrorLog(),()-> logger.error(log.ctn));
            return;
        }

        /**
         *  以下为 只打印关键日志
         */
        if((log.type.equals(LOG_TYPE_ERROR)
                || log.ctn.contains(" WARN ")
                || log.ctn.contains(" ERROR "))
                && !log.ctn.contains("您的配置文件中的列配置信息存在风险")){
            Optionalx.ifThen(log.isInfoLog(),()-> logger.info(log.ctn));
            Optionalx.ifThen(log.isErrorLog(),()-> logger.error(log.ctn));
            return;
        }
        if(log.ctn.contains("INFO  StandAloneJobContainerCommunicator")){
            Optionalx.ifThen(log.isInfoLog(),()-> logger.info(String.format("表:%s迁移进度：%s",tableName,log.ctn.split("StandAloneJobContainerCommunicator -")[1])));
            return;
        }

        if(log.ctn.contains("任务启动时刻")||log.ctn.contains("任务结束时刻")
            ||log.ctn.contains("任务总计耗时")||log.ctn.contains("任务平均流量")
            ||log.ctn.contains("记录写入速度")||log.ctn.contains("读出记录总数")
            ||log.ctn.contains("读写失败总数")){
            Optionalx.ifThen(log.isInfoLog(),()-> logger.info(log.ctn));
            Optionalx.ifThen(log.isErrorLog(),()-> logger.error(log.ctn));
        }
    }

    private String createDataxConfig() throws Exception {
        String dataxTemplate = FileHelper.readExtFile("conf/datax/migration-temp.json").stream().collect(Collectors.joining("\n"));

        String readerFilePath = String.format("conf/datax/reader/%s.json", srcDatabase.getDbType().lname());
        String dataxReader = getDataxTemp(readerFilePath,"migration.src.jdbc.");

        String writerFilePath = String.format("conf/datax/writer/%s.json", targetDatabase.getDbType().lname());
        String dataxWriter = getDataxTemp(writerFilePath,"migration.target.jdbc.");

        return dataxTemplate.replace("\"_reader_\"",dataxReader).replace("\"_writer_\"",dataxWriter);
    }

    private String getDataxTemp(String srcDb,String jdbcConfKey) throws Exception {
        String temp=FileHelper.readExtFile(srcDb).stream().collect(Collectors.joining("\n"));
        Properties jdbcProps = YmlHelper.yamlFile2PropsFilter(parameters.getConfigPath(), jdbcConfKey);
        String jdbcUrl = jdbcProps.getProperty("url");
        String jdbcUsername = jdbcProps.getProperty("user");
        String jdbcPassword = jdbcProps.getProperty("pwd");
        return temp.replace("_jdbcUrl_", jdbcUrl)
                .replace("_userName_", jdbcUsername)
                .replace("_password_", jdbcPassword);
    }


}
